From 051e219866cefb12d3c8fab07e312223bef2792d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Darko=20Juri=C4=87?= Date: Sat, 14 Oct 2017 04:14:19 +0200 Subject: [PATCH] Async function support; action support. --- Samples/ServerClientSample/Client/Program.cs | 8 +- Samples/ServerClientSample/Server/Program.cs | 2 +- .../WebsocketRPC/ClientServer/Connection.cs | 3 +- Source/WebsocketRPC/ClientServer/Server.cs | 11 --- .../WebsocketRPC/ConnectionBinders/Binder.cs | 15 ++++ .../ConnectionBinders/BinderBase.cs | 25 +++++- .../ConnectionBinders/RemoteBinder.cs | 15 ++++ Source/WebsocketRPC/Invokers/LocalInvoker.cs | 33 +++++++- Source/WebsocketRPC/Invokers/RemoteInvoker.cs | 49 ++++++++++-- Source/WebsocketRPC/RPC.cs | 80 +++++++++++++++++-- 10 files changed, 205 insertions(+), 36 deletions(-) diff --git a/Samples/ServerClientSample/Client/Program.cs b/Samples/ServerClientSample/Client/Program.cs index 3bcaea0..9fcf1ec 100644 --- a/Samples/ServerClientSample/Client/Program.cs +++ b/Samples/ServerClientSample/Client/Program.cs @@ -1,21 +1,21 @@ using System; using System.Linq; using System.Threading; +using System.Threading.Tasks; using WebsocketRPC; namespace TestClient { public interface ILocalAPI { - int LongRunningTask(int a, int b); + Task LongRunningTask(int a, int b); } public class RemoteAPI //:IRemoteAPI { - public bool WriteProgress(float progress) + public void WriteProgress(float progress) { Console.WriteLine("\rCompleted: {0}%.", progress * 100); - return true; } } @@ -37,7 +37,7 @@ static void Main(string[] args) c.OnOpen += async () => { var r = await RPC.For().CallAsync(x => x.LongRunningTask(5, 3)); - Console.WriteLine("Result: " + r.First()); + Console.WriteLine("Result: " + r.First().Result); }; }) .Wait(0); diff --git a/Samples/ServerClientSample/Server/Program.cs b/Samples/ServerClientSample/Server/Program.cs index 659ab05..8ddcfa7 100644 --- a/Samples/ServerClientSample/Server/Program.cs +++ b/Samples/ServerClientSample/Server/Program.cs @@ -7,7 +7,7 @@ namespace TestServer { interface IRemoteAPI { - bool WriteProgress(float progress); + void WriteProgress(float progress); } class LocalAPI diff --git a/Source/WebsocketRPC/ClientServer/Connection.cs b/Source/WebsocketRPC/ClientServer/Connection.cs index 8b92980..18492b0 100644 --- a/Source/WebsocketRPC/ClientServer/Connection.cs +++ b/Source/WebsocketRPC/ClientServer/Connection.cs @@ -140,7 +140,7 @@ internal static async Task ListenReceiveAsync(Connection connection, Cancellatio { try { - byte[] receiveBuffer = new byte[1024]; + byte[] receiveBuffer = new byte[RPCSettings.MaxMessageSize]; while (webSocket.State == WebSocketState.Open) { @@ -178,6 +178,7 @@ internal static async Task ListenReceiveAsync(Connection connection, Cancellatio { while (ex.InnerException != null) ex = ex.InnerException; connection.OnError?.Invoke(ex); + connection.OnClose?.Invoke(); //socket will be aborted -> no need to close manually } } diff --git a/Source/WebsocketRPC/ClientServer/Server.cs b/Source/WebsocketRPC/ClientServer/Server.cs index b4367fa..ba0a558 100644 --- a/Source/WebsocketRPC/ClientServer/Server.cs +++ b/Source/WebsocketRPC/ClientServer/Server.cs @@ -31,17 +31,6 @@ namespace WebsocketRPC { - //sample: - /*Server.ListenAsync("http://+:8001/", CancellationToken.None, c => - { - c.OnOpenAsync = () => Task.Run(() => Console.WriteLine("Connection opened.")); - c.OnErrorAsync = e => Task.Run(() => Console.WriteLine("Error: " + e.Message)); - c.OnCloseAsync = () => Task.Run(() => Console.WriteLine("Connection closed.")); - - c.OnReceiveAsync = (s, d) => Task.Run(() => Console.WriteLine("Received: " + d.AsString(Encoding.UTF8))); - }) - .Wait();*/ - /// /// Websocket server. /// diff --git a/Source/WebsocketRPC/ConnectionBinders/Binder.cs b/Source/WebsocketRPC/ConnectionBinders/Binder.cs index 46a2410..28766fc 100644 --- a/Source/WebsocketRPC/ConnectionBinders/Binder.cs +++ b/Source/WebsocketRPC/ConnectionBinders/Binder.cs @@ -74,9 +74,24 @@ public Binder(Connection connection, TObj obj) public TObj Object { get; private set; } + public async Task CallAsync(Expression> functionExpression) + { + await rInvoker.InvokeAsync(functionExpression); + } + public async Task CallAsync(Expression> functionExpression) { return await rInvoker.InvokeAsync(functionExpression); } + + public async Task CallAsync(Expression> functionExpression) + { + return await rInvoker.InvokeAsync(functionExpression); + } + + public async Task> CallAsync(Expression>> functionExpression) + { + return await rInvoker.InvokeAsync(functionExpression); + } } } diff --git a/Source/WebsocketRPC/ConnectionBinders/BinderBase.cs b/Source/WebsocketRPC/ConnectionBinders/BinderBase.cs index 0c5add5..f6698ba 100644 --- a/Source/WebsocketRPC/ConnectionBinders/BinderBase.cs +++ b/Source/WebsocketRPC/ConnectionBinders/BinderBase.cs @@ -62,10 +62,33 @@ public interface IRemoteBinder : IBinder /// /// Calls the RPC method. /// - /// Result. /// Method getter. /// RPC invoking task. + Task CallAsync(Expression> functionExpression); + + /// + /// Calls the RPC method. + /// + /// Result. + /// Method getter. + /// RPC result. Task CallAsync(Expression> functionExpression); + + + /// + /// Calls the RPC method. + /// + /// Method getter. + /// RPC task result. + Task CallAsync(Expression> functionExpression); + + /// + /// Calls the RPC method. + /// + /// Result. + /// Method getter. + /// RPC result. + Task> CallAsync(Expression>> functionExpression); } /// diff --git a/Source/WebsocketRPC/ConnectionBinders/RemoteBinder.cs b/Source/WebsocketRPC/ConnectionBinders/RemoteBinder.cs index ecf352d..e298c00 100644 --- a/Source/WebsocketRPC/ConnectionBinders/RemoteBinder.cs +++ b/Source/WebsocketRPC/ConnectionBinders/RemoteBinder.cs @@ -58,9 +58,24 @@ public RemoteBinder(Connection connection) }; } + public async Task CallAsync(Expression> functionExpression) + { + await rInvoker.InvokeAsync(functionExpression); + } + public async Task CallAsync(Expression> functionExpression) { return await rInvoker.InvokeAsync(functionExpression); } + + public async Task CallAsync(Expression> functionExpression) + { + return await rInvoker.InvokeAsync(functionExpression); + } + + public async Task> CallAsync(Expression>> functionExpression) + { + return await rInvoker.InvokeAsync(functionExpression); + } } } diff --git a/Source/WebsocketRPC/Invokers/LocalInvoker.cs b/Source/WebsocketRPC/Invokers/LocalInvoker.cs index a68c4da..117c6c1 100644 --- a/Source/WebsocketRPC/Invokers/LocalInvoker.cs +++ b/Source/WebsocketRPC/Invokers/LocalInvoker.cs @@ -95,8 +95,21 @@ public async Task InvokeAsync(TObj obj, Request clientMessage) try { - var returnVal = await invokeAsync(methods[functionName], obj, argObjs); - var result = (returnVal != null) ? JToken.FromObject(returnVal, RPCSettings.Serializer) : null; + var hasResult = methods[functionName].ReturnType != typeof(void) && + methods[functionName].ReturnType != typeof(Task); + + JToken result = null; + if (hasResult) + { + var returnVal = await invokeWithResultAsync(methods[functionName], obj, argObjs); + result = (returnVal != null) ? JToken.FromObject(returnVal, RPCSettings.Serializer) : null; + } + else + { + await invokeAsync(methods[functionName], obj, argObjs); + result = JToken.FromObject(true); + } + return (result, null); } catch (Exception ex) @@ -106,7 +119,19 @@ public async Task InvokeAsync(TObj obj, Request clientMessage) } } - async Task invokeAsync(MethodInfo method, TObj obj, object[] args) + async Task invokeAsync(MethodInfo method, TObj obj, object[] args) + { + object returnVal = method.Invoke(obj, args); + + //async method support + if (method.GetCustomAttribute() != null) + { + var task = (Task)returnVal; + await task.ConfigureAwait(false); + } + } + + async Task invokeWithResultAsync(MethodInfo method, TObj obj, object[] args) { object returnVal = method.Invoke(obj, args); @@ -122,5 +147,7 @@ async Task invokeAsync(MethodInfo method, TObj obj, object[] args) return returnVal; } + + } } diff --git a/Source/WebsocketRPC/Invokers/RemoteInvoker.cs b/Source/WebsocketRPC/Invokers/RemoteInvoker.cs index 02e58ce..eb0adde 100644 --- a/Source/WebsocketRPC/Invokers/RemoteInvoker.cs +++ b/Source/WebsocketRPC/Invokers/RemoteInvoker.cs @@ -83,14 +83,52 @@ public void Receive(Response response) runningMethods[response.FunctionName].SetResult(response); } + + public async Task InvokeAsync(Expression> functionExpression) + { + var (funcName, argVals) = getFunctionInfo(functionExpression); + var response = await invokeAsync(funcName, argVals); + + if (response.Error != null) + throw new Exception(response.Error); + } + public async Task InvokeAsync(Expression> functionExpression) { var (funcName, argVals) = getFunctionInfo(functionExpression); - var r = await invokeAsync(funcName, argVals); - return r.Result; + var response = await invokeAsync(funcName, argVals); + + var result = response.ReturnValue.ToObject(RPCSettings.Serializer); + if (response.Error != null) + throw new Exception(response.Error); + + return result; + } + + public async Task InvokeAsync(Expression> functionExpression) + { + var (funcName, argVals) = getFunctionInfo(functionExpression); + var response = await invokeAsync(funcName, argVals); + + if (response.Error != null) + throw new Exception(response.Error); + + return Task.FromResult(true); + } + + public async Task> InvokeAsync(Expression>> functionExpression) + { + var (funcName, argVals) = getFunctionInfo(functionExpression); + var response = await invokeAsync(funcName, argVals); + + var result = response.ReturnValue.ToObject(RPCSettings.Serializer); + if (response.Error != null) + throw new Exception(response.Error); + + return Task.FromResult(result); } - async Task<(TResult Result, Exception Error)> invokeAsync(string name, params object[] args) + async Task invokeAsync(string name, params object[] args) { if (sendAsync == null) throw new Exception("The invoker is not initialized."); @@ -108,10 +146,7 @@ public async Task InvokeAsync(Expression(RPCSettings.Serializer); - var ex = response.Error; - - return (result, (ex != null) ? new Exception(ex) : null); + return response; } //the idea is taken from: https://stackoverflow.com/questions/3766698/get-end-values-from-lambda-expressions-method-parameters?rq=1 diff --git a/Source/WebsocketRPC/RPC.cs b/Source/WebsocketRPC/RPC.cs index 9b98a43..0bb42fe 100644 --- a/Source/WebsocketRPC/RPC.cs +++ b/Source/WebsocketRPC/RPC.cs @@ -79,7 +79,7 @@ public static IBinder Bind(this Connection connection) } /// - /// Gets all two-way or one-way sending binders. + /// Gets all two-way or one-way remote binders. /// /// Interface type. /// Binders. @@ -113,6 +113,37 @@ public static IEnumerable> For(object obj) return binders; } + /// + /// Gets whether the data contain RPC message or not. + /// + /// Received data. + /// True if the data contain RPC message, false otherwise. + public static bool IsRPC(this ArraySegment data) + { + var str = data.ToString(Encoding.ASCII); + return !Request.FromJson(str).IsEmpty || !Response.FromJson(str).IsEmpty; + } + + + /// + /// Calls the remote method. + /// + /// Interface type. + /// Remote binder collection. + /// Method getter. + /// The RPC task. + public static async Task CallAsync(this IEnumerable> binders, Expression> functionExpression) + { + var tasks = new List(); + foreach (var b in binders) + { + var t = b.CallAsync(functionExpression); + tasks.Add(t); + } + + await Task.WhenAll(tasks); + } + /// /// Calls the remote method. /// @@ -120,7 +151,7 @@ public static IEnumerable> For(object obj) /// Method result type. /// Remote binder collection. /// Method getter. - /// The collection of the RPC invoking tasks. + /// The collection of results. public static async Task CallAsync(this IEnumerable> binders, Expression> functionExpression) { var tasks = new List>(); @@ -139,14 +170,47 @@ public static async Task CallAsync(this IEnumera } /// - /// Gets whether the data contain RPC message or not. + /// Calls the remote method. /// - /// Received data. - /// True if the data contain RPC message, false otherwise. - public static bool IsRPC(this ArraySegment data) + /// Interface type. + /// Remote binder collection. + /// Method getter. + /// The RPC task. + public static async Task CallAsync(this IEnumerable> binders, Expression> functionExpression) { - var str = data.ToString(Encoding.ASCII); - return !Request.FromJson(str).IsEmpty || !Response.FromJson(str).IsEmpty; + var tasks = new List(); + foreach (var b in binders) + { + var t = b.CallAsync(functionExpression); + tasks.Add(t); + } + + await Task.WhenAll(tasks); + } + + /// + /// Calls the remote method. + /// + /// Interface type. + /// Method result type. + /// Remote binder collection. + /// Method getter. + /// The collection of results. + public static async Task[]> CallAsync(this IEnumerable> binders, Expression>> functionExpression) + { + var tasks = new List>>(); + foreach (var b in binders) + { + var t = b.CallAsync(functionExpression); + tasks.Add(t); + } + + await Task.WhenAll(tasks); + var results = tasks.Where(x => x.Status == TaskStatus.RanToCompletion) + .Select(x => x.Result) + .ToArray(); + + return results; } } }