From 219fbc868689ae8a6c77ed64abf08859206310e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Darko=20Juri=C4=87?= Date: Fri, 29 Dec 2017 23:32:46 +0100 Subject: [PATCH] Async events, improvements in exception handling. --- Samples/RawMsgJs/Program.cs | 5 +- .../ClientServer/ArraySegmentExtensions.cs | 2 +- .../ClientServer/Connection.cs | 212 +++++++++++++----- .../Features/TimeoutBinder.cs | 10 +- .../Invokers/RemoteInvoker.cs | 16 +- .../RpcBinders/Base/Binder.cs | 4 +- .../RpcBinders/LocalBinder.cs | 6 +- .../ClientServer/Server.cs | 2 +- Tests/ConnectionException.cs | 32 +++ Tests/Program.cs | 47 ++++ Tests/RpcException.cs | 103 +++++++++ Tests/Tests.csproj | 16 ++ WebsocketRPC.sln | 7 + 13 files changed, 389 insertions(+), 73 deletions(-) create mode 100644 Tests/ConnectionException.cs create mode 100644 Tests/Program.cs create mode 100644 Tests/RpcException.cs create mode 100644 Tests/Tests.csproj diff --git a/Samples/RawMsgJs/Program.cs b/Samples/RawMsgJs/Program.cs index 3a1f845..a495107 100644 --- a/Samples/RawMsgJs/Program.cs +++ b/Samples/RawMsgJs/Program.cs @@ -2,6 +2,7 @@ using System; using System.IO; using System.Threading; +using System.Threading.Tasks; using WebSocketRPC; namespace RawMsgJs @@ -30,8 +31,8 @@ static void Main(string[] args) c.BindTimeout(TimeSpan.FromSeconds(30)); c.OnOpen += async () => await c.SendAsync("Hello from server using WebSocketRPC"); - c.OnClose += () => Console.WriteLine("Connection closed."); - c.OnError += e => Console.WriteLine("Error: " + e.Message); + c.OnClose += () => { Console.WriteLine("Connection closed."); return Task.FromResult(true); }; + c.OnError += e => { Console.WriteLine("Error: " + e.Message); return Task.FromResult(true); }; c.OnReceive += async msg => { diff --git a/Source/WebSocketRPC.Base/ClientServer/ArraySegmentExtensions.cs b/Source/WebSocketRPC.Base/ClientServer/ArraySegmentExtensions.cs index 297c352..f026c7e 100644 --- a/Source/WebSocketRPC.Base/ClientServer/ArraySegmentExtensions.cs +++ b/Source/WebSocketRPC.Base/ClientServer/ArraySegmentExtensions.cs @@ -31,7 +31,7 @@ namespace WebSocketRPC /// /// Provides the helper methods. /// - public static class HelperExtensions + static class HelperExtensions { /// /// Converts the specified binary data to a string data using the specified encoding. diff --git a/Source/WebSocketRPC.Base/ClientServer/Connection.cs b/Source/WebSocketRPC.Base/ClientServer/Connection.cs index dced31c..53f95f6 100644 --- a/Source/WebSocketRPC.Base/ClientServer/Connection.cs +++ b/Source/WebSocketRPC.Base/ClientServer/Connection.cs @@ -30,6 +30,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; +using System.Linq; namespace WebSocketRPC { @@ -38,6 +39,8 @@ namespace WebSocketRPC /// public class Connection { + #region Global settings + static int maxMessageSize = 64 * 1024; //x KiB /// /// Gets or sets the maximum message size in bytes [1..Int32.MaxValue]. @@ -72,6 +75,8 @@ public static Encoding Encoding static string messageToBig = "The message exceeds the maximum allowed message size: {0} bytes."; + #endregion + WebSocket socket; TaskQueue sendTaskQueue; @@ -92,22 +97,99 @@ internal protected Connection(WebSocket socket, IReadOnlyDictionary public IReadOnlyDictionary Cookies { get; private set; } + #region Events + /// /// Message receive event. Message is decoded using . /// - public event Action OnReceive; + public event Func OnReceive; /// /// Open event. /// - public event Action OnOpen; + public event Func OnOpen; /// /// Close event. /// - public event Action OnClose; + public event Func OnClose; /// /// Error event Args: exception. /// - public event Action OnError; + public event Func OnError; + + /// + /// Invokes the error event. + /// + /// Exception. + internal void InvokeOnError(Exception exception) + { + if (OnError == null || exception == null) + return; + + try + { + var members = OnError.GetInvocationList().Cast>(); + + Task.WhenAll(members.Select(x => x(exception))) + .Wait(0); + } + catch (Exception ex) when (ex.InnerException is TaskCanceledException) + { } + } + + private void invokeOnOpen() + { + if (OnOpen == null) + return; + + try + { + var members = OnOpen.GetInvocationList().Cast>(); + + Task.WhenAll(members.Select(x=> x())) + .ContinueWith(t => InvokeOnError(t.Exception), TaskContinuationOptions.OnlyOnFaulted) + .Wait(0); + } + catch (Exception ex) when (ex.InnerException is TaskCanceledException) + { } + } + + private void invokeOnReceive(string msg) + { + if (OnReceive == null) + return; + + try + { + var members = OnReceive.GetInvocationList().Cast>(); + + Task.WhenAll(members.Select(x => x(msg))) + .ContinueWith(t => InvokeOnError(t.Exception), TaskContinuationOptions.OnlyOnFaulted) + .Wait(0); + } + catch (Exception ex) when (ex.InnerException is TaskCanceledException) + { } + } + + private void invokeOnClose() + { + if (OnClose == null) + return; + + try + { + var members = OnClose.GetInvocationList().Cast>(); + + Task.WhenAll(members.Select(x => x())) + .ContinueWith(t => InvokeOnError(t.Exception), TaskContinuationOptions.OnlyOnFaulted) + .Wait(0); + } + catch (Exception ex) when (ex.InnerException is TaskCanceledException) + { } + } + + #endregion + + #region Send /// /// Sends the specified data as the text message type. @@ -151,6 +233,10 @@ async Task sendAsync(ArraySegment data, WebSocketMessageType msgType) } } + #endregion + + #region Close + /// /// Closes the connection. /// @@ -164,15 +250,28 @@ public async Task CloseAsync(WebSocketCloseStatus closeStatus = WebSocketCloseSt if (socket.State == WebSocketState.Open || socket.State == WebSocketState.CloseReceived) await socket.CloseOutputAsync(closeStatus, statusDescription, CancellationToken.None); } - catch - { } //do not propagate the exception + catch(Exception ex) + { + InvokeOnError(ex); + } finally { - OnClose?.Invoke(); + invokeOnClose(); clearEvents(); } } + private void clearEvents() + { + OnClose = null; + OnError = null; + OnReceive = null; + } + + #endregion + + #region Receive (listen) + /// /// Listens for the receive messages for the specified connection. /// @@ -184,68 +283,67 @@ internal async Task ListenReceiveAsync(CancellationToken token) { try { - OnOpen?.Invoke(); - byte[] receiveBuffer = new byte[MaxMessageSize]; - - while (socket.State == WebSocketState.Open) - { - WebSocketReceiveResult receiveResult = null; - var count = 0; - do - { - var segment = new ArraySegment(receiveBuffer, count, MaxMessageSize - count); - receiveResult = await socket.ReceiveAsync(segment, CancellationToken.None); - count += receiveResult.Count; - - if (count >= MaxMessageSize) - { - await CloseAsync(WebSocketCloseStatus.MessageTooBig, String.Format(messageToBig, MaxMessageSize)); - return; - } - } - while (receiveResult?.EndOfMessage == false); - - - if (receiveResult.MessageType == WebSocketMessageType.Close) - { - await CloseAsync(); - } - else - { - var segment = new ArraySegment(receiveBuffer, 0, count); - var msg = segment.ToString(Encoding); - OnReceive?.Invoke(msg); - Debug.WriteLine("Received: " + msg); - } - - if (token.IsCancellationRequested) - break; - } + await listenReceiveAsync(token); } catch (Exception ex) { - OnError?.Invoke(ex); + InvokeOnError(ex); await CloseAsync(WebSocketCloseStatus.InternalServerError, ex.Message); //socket will be aborted -> no need to close manually } } } - /// - /// Invokes the error event. - /// - /// Exception. - internal void InvokeError(Exception ex) + async Task listenReceiveAsync(CancellationToken token) { - OnError?.Invoke(ex); - } + invokeOnOpen(); + byte[] receiveBuffer = new byte[maxMessageSize]; - private void clearEvents() - { - OnClose = null; - OnError = null; - OnReceive = null; + while (socket.State == WebSocketState.Open) + { + //receive + WebSocketReceiveResult receiveResult = null; + var count = 0; + do + { + var segment = new ArraySegment(receiveBuffer, count, maxMessageSize - count); + receiveResult = await socket.ReceiveAsync(segment, CancellationToken.None); + count += receiveResult.Count; + + if (count >= maxMessageSize) + { + await CloseAsync(WebSocketCloseStatus.MessageTooBig, String.Format(messageToBig, maxMessageSize)); + return; + } + } + while (receiveResult?.EndOfMessage == false); + + //process response + switch (receiveResult.MessageType) + { + case WebSocketMessageType.Close: + await CloseAsync(); + break; + case WebSocketMessageType.Binary: + InvokeOnError(new NotSupportedException($"Binary messages are not supported. Received {count} bytes.")); + Debug.WriteLine("Received binary."); + break; + default: + var segment = new ArraySegment(receiveBuffer, 0, count); + var msg = segment.ToString(encoding); + + invokeOnReceive(msg); + Debug.WriteLine("Received: " + msg); + break; + } + + //check if cancellation is requested + if (token.IsCancellationRequested) + break; + } } + + #endregion } } diff --git a/Source/WebSocketRPC.Base/Features/TimeoutBinder.cs b/Source/WebSocketRPC.Base/Features/TimeoutBinder.cs index 3519f2d..be446d9 100644 --- a/Source/WebSocketRPC.Base/Features/TimeoutBinder.cs +++ b/Source/WebSocketRPC.Base/Features/TimeoutBinder.cs @@ -1,5 +1,6 @@ using System; using System.Collections.Generic; +using System.Threading.Tasks; using System.Timers; namespace WebSocketRPC @@ -28,11 +29,12 @@ public TimeoutBinder(Connection connection, TimeSpan timeout, string closeMessag timer.AutoReset = false; timer.Elapsed += Timer_Elapsed; - connection.OnOpen += () => timer.Enabled = true; + connection.OnOpen += () => { timer.Enabled = true; return Task.FromResult(true); }; connection.OnReceive += msg => { timer.Enabled = false; timer.Enabled = true; + return Task.FromResult(true); }; } @@ -61,7 +63,11 @@ public static void BindTimeout(this Connection connection, TimeSpan timeout, str var binder = new TimeoutBinder(connection, timeout, closeMessage); lock (timeoutBinders) timeoutBinders.Add(binder); - connection.OnClose += () => { lock (timeoutBinders) timeoutBinders.Remove(binder); }; + connection.OnClose += () => + { + lock (timeoutBinders) timeoutBinders.Remove(binder); + return Task.FromResult(true); + }; } } } diff --git a/Source/WebSocketRPC.Base/Invokers/RemoteInvoker.cs b/Source/WebSocketRPC.Base/Invokers/RemoteInvoker.cs index 2fd51e2..f64f0ef 100644 --- a/Source/WebSocketRPC.Base/Invokers/RemoteInvoker.cs +++ b/Source/WebSocketRPC.Base/Invokers/RemoteInvoker.cs @@ -56,13 +56,13 @@ public RemoteInvoker() static void verifyType() { if (!typeof(TInterface).IsInterface) - throw new Exception("The specified type must be an interface type."); + throw new Exception($"The specified type '{typeof(TInterface).Name}' must be an interface type."); var methodList = typeof(TInterface).GetMethods(BindingFlags.Public | BindingFlags.Instance); var propertyList = typeof(TInterface).GetProperties(BindingFlags.Public | BindingFlags.Instance); if (propertyList.Any()) - throw new NotSupportedException("The interface must not declare any properties: " + String.Join(", ", propertyList.Select(x => x.Name)) + "."); + throw new NotSupportedException($"The interface '{typeof(TInterface).Name}' must not declare any properties: { String.Join(", ", propertyList.Select(x => x.Name)) }."); } @@ -89,7 +89,7 @@ public async Task InvokeAsync(Expression> functionExpression) var response = await invokeAsync(funcName, argVals); if (response.Error != null) - throw new Exception(response.Error); + throw new Exception($"Exception thrown while calling {funcName}. Message: {response.Error}."); } public async Task InvokeAsync(Expression> functionExpression) @@ -97,10 +97,10 @@ public async Task InvokeAsync(Expression(RPC.Serializer); if (response.Error != null) - throw new Exception(response.Error); + throw new Exception($"Exception thrown while calling {funcName}. Message: {response.Error}."); + var result = response.ReturnValue.ToObject(RPC.Serializer); return result; } @@ -110,7 +110,7 @@ public async Task InvokeAsync(Expression> functionExpress var response = await invokeAsync(funcName, argVals); if (response.Error != null) - throw new Exception(response.Error); + throw new Exception($"Exception thrown while calling {funcName}. Message: {response.Error}."); } public async Task InvokeAsync(Expression>> functionExpression) @@ -118,10 +118,10 @@ public async Task InvokeAsync(Expression(RPC.Serializer); if (response.Error != null) - throw new Exception(response.Error); + throw new Exception($"Exception thrown while calling {funcName}. Message: {response.Error}."); + var result = response.ReturnValue.ToObject(RPC.Serializer); return result; } diff --git a/Source/WebSocketRPC.Base/RpcBinders/Base/Binder.cs b/Source/WebSocketRPC.Base/RpcBinders/Base/Binder.cs index 90e316b..3853577 100644 --- a/Source/WebSocketRPC.Base/RpcBinders/Base/Binder.cs +++ b/Source/WebSocketRPC.Base/RpcBinders/Base/Binder.cs @@ -25,7 +25,7 @@ using System; using System.Diagnostics; - +using System.Threading.Tasks; namespace WebSocketRPC { @@ -43,6 +43,7 @@ protected Binder(Connection connection) Debug.WriteLine("Close"); lock (RPC.AllBinders) RPC.AllBinders.Remove(this); + return Task.FromResult(true); }; Connection.OnError += e => @@ -50,6 +51,7 @@ protected Binder(Connection connection) Debug.WriteLine("Error"); lock (RPC.AllBinders) RPC.AllBinders.Remove(this); + return Task.FromResult(true); }; } } diff --git a/Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs b/Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs index 051aefa..259c284 100644 --- a/Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs +++ b/Source/WebSocketRPC.Base/RpcBinders/LocalBinder.cs @@ -24,6 +24,7 @@ #endregion using System; +using System.Threading.Tasks; namespace WebSocketRPC { @@ -48,7 +49,10 @@ public LocalBinder(Connection connection, TObj obj) Connection.OnClose += () => { - (Object as IDisposable)?.Dispose(); + try { (Object as IDisposable)?.Dispose(); } + catch (Exception ex) { Connection.InvokeOnError(ex); } + + return Task.FromResult(true); }; } diff --git a/Source/WebsocketRPC.Standalone/ClientServer/Server.cs b/Source/WebsocketRPC.Standalone/ClientServer/Server.cs index becc900..7ca16ed 100644 --- a/Source/WebsocketRPC.Standalone/ClientServer/Server.cs +++ b/Source/WebsocketRPC.Standalone/ClientServer/Server.cs @@ -206,7 +206,7 @@ static async Task listenAsync(HttpListenerContext ctx, CancellationToken token, } catch (Exception ex) { - connection.InvokeError(ex); + connection.InvokeOnError(ex); } finally { diff --git a/Tests/ConnectionException.cs b/Tests/ConnectionException.cs new file mode 100644 index 0000000..59a02bc --- /dev/null +++ b/Tests/ConnectionException.cs @@ -0,0 +1,32 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using WebSocketRPC; + +namespace Tests +{ + partial class Program + { + static Task[] TestConnectionUnhandledException(CancellationTokenSource cts) + { + var ts = Server.ListenAsync($"http://{address}", cts.Token, (c, wc) => c.Bind(new ServiceAPI())); + var tc = Client.ConnectAsync($"ws://{address}", cts.Token, c => + { + c.Bind(); + c.OnOpen += () => + { + throw new NotImplementedException(); + }; + + c.OnError += e => + { + Console.WriteLine("Error: " + e.Message); + return Task.FromResult(true); + }; + }, + reconnectOnError: false); + + return new Task[] { ts, tc }; + } + } +} diff --git a/Tests/Program.cs b/Tests/Program.cs new file mode 100644 index 0000000..7ab326f --- /dev/null +++ b/Tests/Program.cs @@ -0,0 +1,47 @@ +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace Tests +{ + public partial class Program + { + const string address = "localhost:8001/"; + + static void RunTest(Func testFunc) + { + var cts = new CancellationTokenSource(); + var tasks = testFunc(cts); + + foreach (var t in tasks) + t.ContinueWith(_ => cts.Cancel(), TaskContinuationOptions.NotOnRanToCompletion); + + Task.Run(() => + { + //Console.WriteLine("Press [Enter] or [Ctrl+C] to quit."); + Console.ReadLine(); + + cts.Cancel(); + Console.WriteLine("Waiting for the tasks to finish."); + }); + + try + { + Task.WaitAll(tasks); + } + catch (Exception ex) + { + Console.WriteLine("Task completion error: {0}", ex?.Message); + } + } + + static void Main(string[] args) + { + //RunTest(TestConnectionException); + + RunTest(TestRpcInitializeException); + //RunTest(TestRpcUnhandledException); + //RunTest(TestRpcHandledException); + } + } +} diff --git a/Tests/RpcException.cs b/Tests/RpcException.cs new file mode 100644 index 0000000..3b751ff --- /dev/null +++ b/Tests/RpcException.cs @@ -0,0 +1,103 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using WebSocketRPC; + +namespace Tests +{ + partial class Program + { + interface IServiceAPI + { + int LongRunningTask(int a, int b); + } + + class ServiceAPI : IServiceAPI + { + public int LongRunningTask(int a, int b) + { + throw new NotImplementedException("The method is not implemented."); + } + } + + static Task[] TestRpcInitializeException(CancellationTokenSource cts) + { + var ts = Server.ListenAsync($"http://{address}", cts.Token, (c, wc) => c.Bind(new ServiceAPI())); + var tc = Client.ConnectAsync($"ws://{address}", cts.Token, c => + { + c.Bind(); + c.OnOpen += async () => + { + try + { + var results = await RPC.For().CallAsync(x => x.LongRunningTask(1, 2)); + } + catch (Exception ex) + { + Console.WriteLine("Error while executing {0}: {1}", nameof(ServiceAPI.LongRunningTask), ex.Message); + } + }; + + c.OnError += e => + { + Console.WriteLine("Error: " + e.Message); + return Task.FromResult(true); + }; + }, + reconnectOnError: false); + + return new Task[] { ts, tc }; + } + + static Task[] TestRpcUnhandledException(CancellationTokenSource cts) + { + var ts = Server.ListenAsync($"http://{address}", cts.Token, (c, wc) => c.Bind(new ServiceAPI())); + var tc = Client.ConnectAsync($"ws://{address}", cts.Token, c => + { + c.Bind(); + c.OnOpen += async () => + { + var results = await RPC.For().CallAsync(x => x.LongRunningTask(1, 2)); + }; + + c.OnError += e => + { + Console.WriteLine("Error: " + e.Message); + return Task.FromResult(true); + }; + }, + reconnectOnError: false); + + return new Task[] { ts, tc }; + } + + static Task[] TestRpcHandledException(CancellationTokenSource cts) + { + var ts = Server.ListenAsync($"http://{address}", cts.Token, (c, wc) => c.Bind(new ServiceAPI())); + var tc = Client.ConnectAsync($"ws://{address}", cts.Token, c => + { + c.Bind(); + c.OnOpen += async () => + { + try + { + var results = await RPC.For().CallAsync(x => x.LongRunningTask(1, 2)); + } + catch (Exception ex) + { + Console.WriteLine("Error while executing {0}: {1}", nameof(ServiceAPI.LongRunningTask), ex.Message); + } + }; + + c.OnError += e => + { + Console.WriteLine("Error: " + e.Message); + return Task.FromResult(true); + }; + }, + reconnectOnError: false); + + return new Task[] { ts, tc }; + } + } +} diff --git a/Tests/Tests.csproj b/Tests/Tests.csproj new file mode 100644 index 0000000..6b3685c --- /dev/null +++ b/Tests/Tests.csproj @@ -0,0 +1,16 @@ + + + netcoreapp2.0;net47 + + + + bin\ + + Exe + + + + + + + \ No newline at end of file diff --git a/WebsocketRPC.sln b/WebsocketRPC.sln index a6a571f..ac61e48 100644 --- a/WebsocketRPC.sln +++ b/WebsocketRPC.sln @@ -65,6 +65,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "RawMsgJs", "Samples\RawMsgJ EndProject Project("{D954291E-2A0B-460D-934E-DC6B0785DB48}") = "SampleBase", "Samples\SampleBase\SampleBase.shproj", "{4C3C0292-F1F5-4997-8A0E-B4C7D05BB22F}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Tests", "Tests\Tests.csproj", "{96225EA6-4EA7-48AB-91F9-E53B0B5605F0}" +EndProject Global GlobalSection(SharedMSBuildProjectFiles) = preSolution Samples\SampleBase\SampleBase.projitems*{4c3c0292-f1f5-4997-8a0e-b4c7d05bb22f}*SharedItemsImports = 13 @@ -115,6 +117,10 @@ Global {76FDC1DC-D80A-42E8-8A5D-2FE3462B9E3F}.Debug|Any CPU.Build.0 = Debug|Any CPU {76FDC1DC-D80A-42E8-8A5D-2FE3462B9E3F}.Release|Any CPU.ActiveCfg = Release|Any CPU {76FDC1DC-D80A-42E8-8A5D-2FE3462B9E3F}.Release|Any CPU.Build.0 = Release|Any CPU + {96225EA6-4EA7-48AB-91F9-E53B0B5605F0}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {96225EA6-4EA7-48AB-91F9-E53B0B5605F0}.Debug|Any CPU.Build.0 = Debug|Any CPU + {96225EA6-4EA7-48AB-91F9-E53B0B5605F0}.Release|Any CPU.ActiveCfg = Release|Any CPU + {96225EA6-4EA7-48AB-91F9-E53B0B5605F0}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -135,6 +141,7 @@ Global {CFC334A5-2B52-42A0-87AE-4B3F84B09530} = {E5509F73-1E5E-45B4-AED7-4A38F8DF1DDE} {76FDC1DC-D80A-42E8-8A5D-2FE3462B9E3F} = {E5509F73-1E5E-45B4-AED7-4A38F8DF1DDE} {4C3C0292-F1F5-4997-8A0E-B4C7D05BB22F} = {E5509F73-1E5E-45B4-AED7-4A38F8DF1DDE} + {96225EA6-4EA7-48AB-91F9-E53B0B5605F0} = {01049849-1A9A-4C3A-BD56-EA6B628F9F36} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {591A6475-8DF2-42DA-AFF1-8EF88BCF6EE4}